Our goal with this notebook is to explore the Titanic dataset and train two classifiers that will let us determine if a certain passenger was likely to survive or not based on his/her characteristics (ex. age, gender, class). You can run this locally but one can use the same code to run in distributed fashion as we are using the RDD abstraction from Apache Spark.
Some resources that we used to compose this notebook:
With the IBM Spark Kernel we already have the default spark context (ex. sc), so let's start by loading the file and inspecting our data.
In [1]:
val rawRdd = sc.textFile("datasets/COUNT/titanic.csv")
In [2]:
rawRdd.count()
Out[2]:
In [3]:
rawRdd.take(5)
Out[3]:
Our dataset as we see above contains a line with the header. Let's get rid of it and leave only the data points.
In [4]:
val header = rawRdd.first()
val dataRdd = rawRdd.filter( _ != header)
Let's verify that our new dataset no longer has the header.
In [5]:
dataRdd.first()
Out[5]:
We can see other data points at random. Try replacing the last parameter 0L
to 3L
, this is just a seed.
In [6]:
dataRdd.takeSample(false, 5, 0L)
Out[6]:
So, in order to play with machine learning models we need a numerical representation of our data. Thus we need to translate our data points to feature vectors, you can think of this as just a list of numbers, where every number is a feature or an encoding of the data. Let's first process our data, keep in mind that up to here our dataRdd object has each record as a string, so we need to split that into "columns"
In [7]:
val rowsRdd = dataRdd.map(line => line.split(",").map(_.trim))
In [8]:
rowsRdd.take(2)
Let's create a function to convert data points to feature vectors. We need to feed LabeledPoint
objects to our machine learning models.
In [9]:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
def toVector( row : Array[String] ) : LabeledPoint = {
val klass = row(1).charAt(1)-'0'.toDouble-1
val age = if (row(2).contains("adults")) 1 else 0
val sex = if (row(3).contains("women")) 1 else 0
val survived = if (row(4).contains("yes")) 1 else 0
LabeledPoint(survived, Vectors.dense(klass,age,sex))
}
We apply our define function to every row we have
In [10]:
val vectorsRdd = rowsRdd.map(row => toVector(row))
Now we can check that our feature vectors were created correctly. Refer to our toVector
function for the mapping.
Try reading. The first instance would read as: A person in "3rd class" who was an "adult" and a "woman" did "not survive" (0.0,[2.0.1.0,1.0])
In [11]:
vectorsRdd.takeSample(false, 5, 0)
Out[11]:
Finally, let's split the data, allocating 70% for training and 30% for testing.
In [12]:
val splits = vectorsRdd.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
We will start by training a decision tree model which is popular these days, but there are cases where the structure of the data might benefit from other supervised algorithms. For our case, we specify that numClasses
is 2 as we are concerned with either a survived (1.0) or not survived (0.0) prediction. Also for categoricalFeaturesInfo
we specify for each feature how many outcomes we can have. Thus, for the first Map, we say that for our feature 0, we can have 3 different outcomes (ex. 1st, 2nd or 3rd class).
In [13]:
import org.apache.spark.mllib.tree.DecisionTree
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]((0,3), (1,2), (2,2))
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
Now that we train, let's predict
In [14]:
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Notice that we used only the features to predict. And below let's compute the error rate. You can see that first level if-else, is based on sex, and for many of the inner branches looks like if you were a women our model is more likely to predict 1.0 (survived).
In [15]:
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification tree model:\n" + model.toDebugString)
Now let's have some fun, we can create random scenarios to see how our model would classify that data point. Remember we have to talk to our model using LabeledPoint
objects. So, we need to create instances using the following:
LabeledPoint(survived, Vectors.dense(klass,age,sex))
First 3 test passengers are men in 1st, 2nd and 3rd class. The last one is a girl.
In [36]:
val testPassenger1 = LabeledPoint(0.0, Vectors.dense(0.0,1,0,0.0))
val testPassenger2 = LabeledPoint(0.0, Vectors.dense(1.0,1,0,0.0))
val testPassenger3 = LabeledPoint(0.0, Vectors.dense(2.0,1,0,0.0))
val testPassenger4 = LabeledPoint(1.0, Vectors.dense(0.0,0,0,1.0))
In [37]:
println(model.predict(testPassenger4.features))
Seems that if you were a girl in 1st class our model says you were likely to survive. On the other hand, if you were a man regardless of the class your chances were not that good.
Now let's train another model using our same trainingData
object we created at the end of Section 2 for comparison purposes
In [32]:
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
val numIterations = 100
val linearRegressionModel = LinearRegressionWithSGD.train(trainingData, numIterations)
In [34]:
// Compute raw scores on the test set.
val scoreAndLabels = testData.map { point =>
val score = linearRegressionModel.predict(point.features)
(score, point.label)
}
In [36]:
// Get evaluation metrics.
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
Notice we got ROC value of 79%, ROC is just another way to calculate precision of the model.